package club.monkeywood.ad.dmp.report

import club.monkeywood.ad.dmp.bean.Log
import club.monkeywood.ad.dmp.util.RptUtils
import org.apache.commons.lang.StringUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession


/**
	* 媒体报表分析
	*     使用广播变量从app字典文件中查询应用名称
	*     app id对应的app name
	*/
object AppAnalyseRpt extends App{

	if (args.length != 3) {
		println(
			"""
				|cn.dmp.report.AppAnalyseRpt
				|参数：
				| 输入路径
				| 字典文件路径
				| 输出路径
			""".stripMargin)
		sys.exit()
	}

	val Array(inputPath, dictFilePath, outputPath) = args

	// 2 创建sparkconf->SparkSession
	val sparkConf = new SparkConf()
	sparkConf.setAppName(s"${this.getClass.getSimpleName}")
	sparkConf.setMaster("local[*]")
	//使用KryoSerializer更快
	// RDD 序列化到磁盘 worker与worker之间的数据传输
	sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

	val ss = SparkSession.builder()
		.appName(s"${this.getClass.getSimpleName}")
		.config(sparkConf)
		.getOrCreate()

	// 字典文件
	// 注意：一定要collect，否则使用广播变量时，只能获取到广播变量的部分数据（切片数据）
	val dictMap: Map[String, String] = ss.sparkContext.textFile(dictFilePath)
														.map(line => {
															val fields = line.split("\t", -1)
															(fields(4), fields(1))
														}).collect().toMap

	// 将字典数据广播executor
	val broadcast = ss.sparkContext.broadcast(dictMap)

	// 读取日志数据
	ss.sparkContext.textFile(inputPath)
		.map(_.split(",", -1))
		.filter(_.length >=85)
		.map(Log(_)).filter(log => !log.appid.isEmpty || !log.appname.isEmpty)
		.map(log => {
			var newAppName = log.appname
			if(!StringUtils.isNotEmpty(newAppName)) {
				newAppName = broadcast.value.getOrElse(log.appid, "未知")
			}

			val req = RptUtils.caculateReq(log.requestmode, log.processnode)
			val rtb = RptUtils.caculateRtb(log.iseffective, log.isbilling,log.isbid, log.adorderid, log.iswin, log.winprice, log.adpayment)
			val showClick = RptUtils.caculateShowClick(log.requestmode, log.iseffective)

			(newAppName, req++rtb++showClick)
		}).reduceByKey((list1, list2) => {
		list1.zip(list2).map(t => t._1 + t._2)
	}).map(t => t._1+","+t._2.mkString(","))
		.saveAsTextFile(outputPath)

	ss.stop()
}
